-
Notifications
You must be signed in to change notification settings - Fork 594
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(batch): support batch read iceberg source #15214
Conversation
…ead_iceberg_source
|
GitGuardian id | GitGuardian status | Secret | Commit | Filename | |
---|---|---|---|---|---|
9425213 | Triggered | Generic Password | 11fdfcd | ci/scripts/regress-test.sh | View secret |
🛠 Guidelines to remediate hardcoded secrets
- Understand the implications of revoking this secret by investigating where it is used in your code.
- Replace and store your secret safely. Learn here the best practices.
- Revoke and rotate this secret.
- If possible, rewrite git history. Rewriting git history is not a trivial act. You might completely break other contributing developers' workflow and you risk accidentally deleting legitimate data.
To avoid such incidents in the future consider
- following these best practices for managing and storing secrets including API keys and other credentials
- install secret detection on pre-commit to catch secret before it leaves your machine and ease remediation.
🦉 GitGuardian detects secrets in your source code to help developers and security teams secure the modern development process. You are seeing this because you or someone else with access to this repository has authorized GitGuardian to scan your pull request.
Our GitHub checks need improvements? Share your feedbacks!
Cluster Info: Case 1CREATE source data_gen_source (seq_id bigint, user_id bigint,
user_name varchar)
WITH (
connector = 'datagen',
fields.seq_id.kind = 'sequence',
fields.seq_id.start = '1',
fields.seq_id.end = '100000000',
fields.user_id.kind = 'random',
fields.user_id.min = '1',
fields.user_id.max = '10000000',
fields.user_name.kind = 'random',
fields.user_name.length = '10',
datagen.rows.per.second = '500000'
) ROW FORMAT JSON;
create sink sink_iceberg from data_gen_source
with (
connector = 'iceberg',
type='append-only',
force_append_only = 'true',
catalog.type = 'storage',
warehouse.path = 's3a://dylan-test-iceberg/',
s3.region = 'us-east-1',
database.name='demo_db',
table.name='demo_table'
)
-- Wait until enough data After compaction, the iceberg table insert selectcreate source xs
( seq_id bigint, user_id bigint, user_name string )
with (
connector = 'iceberg',
catalog.type = 'storage',
warehouse.path = 's3a://dylan-test-iceberg/',
s3.endpoint = 'https://s3.us-east-1.amazonaws.com',
s3.region = 'us-east-1',
database.name='demo_db',
table.name='demo_table'
);
create table t (seq_id bigint, user_id bigint, user_name varchar);
set batch_enable_distributed_dml = true;
insert into t select * from xs;
INSERT 0 231331776
Time: 183s
about 1.3million rows/s
about 16MB/s selectdev=> select count(*) from xs;
count
-----------
231331776
(1 row)
Time: 25s
about 9million rows/s
about 122MB/s Case 2CREATE source data_gen_source_for_big_table (seq_id bigint, user_id bigint,
user_name varchar, big_column1 varchar, big_column2 varchar, big_column3 varchar)
WITH (
connector = 'datagen',
fields.seq_id.kind = 'sequence',
fields.seq_id.start = '1',
fields.seq_id.end = '10000000000',
fields.user_id.kind = 'random',
fields.user_id.min = '1',
fields.user_id.max = '10000000000',
fields.user_name.kind = 'random',
fields.user_name.length = '10',
fields.big_column1.kind = 'random',
fields.big_column1.length = '256',
fields.big_column2.kind = 'random',
fields.big_column2.length = '512',
fields.big_column3.kind = 'random',
fields.big_column3.length = '1024',
datagen.rows.per.second = '1000000'
) ROW FORMAT JSON;
create sink sink_iceberg from data_gen_source_for_big_table
with (
connector = 'iceberg',
type='append-only',
force_append_only = 'true',
catalog.type = 'storage',
warehouse.path = 's3a://dylan-test-iceberg/',
s3.endpoint = 'https://s3.us-east-1.amazonaws.com',
s3.region = 'us-east-1',
database.name='demo_db',
table.name='big_table'
); After compaction, the iceberg table insert selectcreate source big_table_source
( seq_id bigint, user_id bigint, user_name string, big_column1 varchar , big_column2 varchar , big_column3 varchar )
with (
connector = 'iceberg',
catalog.type = 'storage',
warehouse.path = 's3a://dylan-test-iceberg/',
s3.endpoint = 'https://s3.us-east-1.amazonaws.com',
s3.region = 'us-east-1',
database.name='demo_db',
table.name='big_table'
);
create table t ( seq_id bigint, user_id bigint, user_name string, big_column1 varchar , big_column2 varchar , big_column3 varchar );
set batch_enable_distributed_dml = true;
insert into t select * from big_table_source;
INSERT 0 21077248
Time: 172s
about 150k rows/s
about 178MB/s selectdev=> select count(*) from big_table_source;
count
----------
21077248
(1 row)
Time: 123603.571 ms (02:03.604)
about 170k rows/s
about 250MB/s |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The rest LGTM.
Additionally,
Is it possible to check and ensure this? |
The table properties set by e.g. sparks could be selected by RisingWave(icelake). It seems not just a hint for a specific connector or sdk. cc @liurenjie1024 |
How about we merge this after #14885 ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks!
Both are okay to me. Conflicts might be easy to be resolved. |
We may need to change some tests. |
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.
catalog.type = 'storage'
is supported in iceberg sourceExample: